package com.bujunjie.study.realtime.dwd.db.app;

import com.bujunjie.study.realtime.common.base.BaseSQLApp;
import com.bujunjie.study.realtime.common.constant.FlinkConstant;
import com.bujunjie.study.realtime.common.util.SQLUtil;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.time.Duration;

/**
 * <p>dwd 交易详情</p>
 *
 * @author bu.junjie
 * @version 1.0.0
 * @createTime 2025/9/17 13:48
 */
public class DwdTradeOrderDetail extends BaseSQLApp {

    public static void main(String[] args) {
        new DwdTradeOrderDetail().start(10014, 4, FlinkConstant.TOPIC_DWD_TRADE_ORDER_DETAIL);
    }

    @Override
    public void handle(StreamTableEnvironment tableEnv) {
        //  设置状态的过期有效时间
        tableEnv.getConfig().setIdleStateRetention(Duration.ofSeconds(10));
        //  1. 从 ods 读取数据
        this.readOdsDb(tableEnv, FlinkConstant.TOPIC_DWD_TRADE_ORDER_DETAIL);
        //  读取订单详情的的数据
        Table orderDetail = tableEnv.sqlQuery("select " +
                "data['id'] id," +
                "data['order_id'] order_id," +
                "data['sku_id'] sku_id," +
                "data['sku_name'] sku_name," +
                "data['create_time'] create_time," +
                "data['source_id'] source_id," +
                "data['source_type'] source_type," +
                "data['sku_num'] sku_num," +
                "cast(cast(data['sku_num'] as decimal(16,2)) * " +
                "   cast(data['order_price'] as decimal(16,2)) as String) split_original_amount," + // 分摊原始总金额
                "data['split_total_amount'] split_total_amount," +  // 分摊总金额
                "data['split_activity_amount'] split_activity_amount," + // 分摊活动金额
                "data['split_coupon_amount'] split_coupon_amount," + // 分摊的优惠券金额
                "ts " +
                "from topic_db " +
                "where `table`='order_detail' " +
                "and `type`='insert' ");
        //  注册成订单明细表
        tableEnv.createTemporaryView("order_detail", orderDetail);
        // 查找订单主表的信息
        Table orderInfo = tableEnv.sqlQuery("select " +
                "data['id'] id," +
                "data['user_id'] user_id," +
                "data['province_id'] province_id " +
                "from topic_db " +
                "where `table`='order_info' " +
                "and `type`='insert' ");
        tableEnv.createTemporaryView("order_info", orderInfo);

        // 订单明细活动表
        Table orderDetailActivity = tableEnv.sqlQuery("select " +
                "data['order_detail_id'] order_detail_id, " +
                "data['activity_id'] activity_id, " +
                "data['activity_rule_id'] activity_rule_id " +
                "from topic_db " +
                "where `table`='order_detail_activity' " +
                "and `type`='insert' ");
        tableEnv.createTemporaryView("order_detail_activity", orderDetailActivity);
        // 过滤出明细优惠券数据
        Table orderDetailCoupon = tableEnv.sqlQuery("select " +
                "data['order_detail_id'] order_detail_id, " +
                "data['coupon_id'] coupon_id " +
                "from topic_db " +
                "where `table`='order_detail_coupon' " +
                "and `type`='insert' ");
        tableEnv.createTemporaryView("order_detail_coupon", orderDetailCoupon);
        //  上述4张表进行关联查询
        Table result = tableEnv.sqlQuery("select " +
                "od.id," +
                "od.order_id," +
                "oi.user_id," +
                "od.sku_id," +
                "od.sku_name," +
                "oi.province_id," +
                "act.activity_id," +
                "act.activity_rule_id," +
                "cou.coupon_id," +
                "date_format(od.create_time, 'yyyy-MM-dd') date_id," +  // 年月日
                "od.create_time," +
                "od.sku_num," +
                "od.split_original_amount," +
                "od.split_activity_amount," +
                "od.split_coupon_amount," +
                "od.split_total_amount," +
                "od.ts " +
                "from order_detail od " +
                "join order_info oi on od.order_id=oi.id " +
                "left join order_detail_activity act " +
                "on od.id=act.order_detail_id " +
                "left join order_detail_coupon cou " +
                "on od.id=cou.order_detail_id ");
        // 创建表，
        tableEnv.executeSql("create table " + FlinkConstant.TOPIC_DWD_TRADE_ORDER_DETAIL + "(" +
                "id string," +
                "order_id string," +
                "user_id string," +
                "sku_id string," +
                "sku_name string," +
                "province_id string," +
                "activity_id string," +
                "activity_rule_id string," +
                "coupon_id string," +
                "date_id string," +
                "create_time string," +
                "sku_num string," +
                "split_original_amount string," +
                "split_activity_amount string," +
                "split_coupon_amount string," +
                "split_total_amount string," +
                "ts bigint," +
                "primary key(id) not enforced " +
                ")" + SQLUtil.getUpsertKafkaDDL(FlinkConstant.TOPIC_DWD_TRADE_ORDER_DETAIL));
        //  写入到订单明细表中
        result.executeInsert(FlinkConstant.TOPIC_DWD_TRADE_ORDER_DETAIL);

    }
}
